{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "Custom Workloads with Dask Delayed\n",
    "==================================\n",
    "\n",
    "<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n",
    "     width=\"30%\" \n",
    "     align=right\n",
    "     alt=\"Dask logo\">\n",
    "\n",
    "*Because not all problems are dataframes*\n",
    "\n",
    "This notebook shows using [dask.delayed](http://dask.pydata.org/en/latest/delayed.html) to parallelize generic Python code.  \n",
    "\n",
    "Dask.delayed is a simple and powerful way to parallelize existing code.  It allows users to delay function calls into a task graph with dependencies.  Dask.delayed doesn't provide any fancy parallel algorithms like Dask.dataframe, but it does give the user complete control over what they want to build.\n",
    "\n",
    "Systems like Dask.dataframe are built with Dask.delayed.  If you have a problem that is paralellizable, but isn't as simple as just a big array or a big dataframe, then dask.delayed may be the right choice for you."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start Dask Client for Dashboard\n",
    "\n",
    "Starting the Dask Client is optional.  It will provide a dashboard which \n",
    "is useful to gain insight on the computation.  \n",
    "\n",
    "The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, progress\n",
    "client = Client(threads_per_worker=4, n_workers=1)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create simple functions\n",
    "\n",
    "These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import random\n",
    "\n",
    "def inc(x):\n",
    "    time.sleep(random.random())\n",
    "    return x + 1\n",
    "\n",
    "def dec(x):\n",
    "    time.sleep(random.random())\n",
    "    return x - 1\n",
    "    \n",
    "def add(x, y):\n",
    "    time.sleep(random.random())\n",
    "    return x + y "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can run them like normal Python functions below"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "x = inc(1)\n",
    "y = dec(2)\n",
    "z = add(x, y)\n",
    "z"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "These ran one after the other, in sequence.  Note though that the first two lines `inc(1)` and `dec(2)` don't depend on each other, we *could* have called them in parallel had we been clever."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Annotate functions with Dask Delayed to make them lazy\n",
    "\n",
    "We can call `dask.delayed` on our funtions to make them lazy.  Rather than compute their results immediately, they record what we want to compute as a task into a graph that we'll run later on parallel hardware."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask\n",
    "inc = dask.delayed(inc)\n",
    "dec = dask.delayed(dec)\n",
    "add = dask.delayed(add)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Calling these lazy functions is now almost free.  We're just constructing a graph"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "x = inc(1)\n",
    "y = dec(2)\n",
    "z = add(x, y)\n",
    "z"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Visualize computation\n",
    "\n",
    "You will need graphviz installed for this to work"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "z.visualize(rankdir='LR')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run in parallel\n",
    "\n",
    "Call `.compute()` when you want your result as a normal Python object\n",
    "\n",
    "If you started `Client()` above then you may want to watch the status page during computation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "z.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Parallelize Normal Python code\n",
    "\n",
    "Now we use Dask in normal for-loopy Python code.  This generates graphs instead of doing computations directly, but still looks like the code we had before.  Dask is a convenient way to add parallelism to existing workflows."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "zs = []"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "for i in range(256):\n",
    "    x = inc(i)\n",
    "    y = dec(x)\n",
    "    z = add(x, y)\n",
    "    zs.append(z)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "zs = dask.persist(*zs)  # trigger computation in the background"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To make this go faster, add additional workers.\n",
    "\n",
    "(although we're still only working on our local machine, this is more practical when using an actual cluster)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.cluster.scale(10)  # ask for ten 4-thread workers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc.."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Custom computation: Tree summation\n",
    "\n",
    "As an example of a non-trivial algorithm, consider the classic tree reduction.  We accomplish this with a nested for loop and a bit of normal Python logic.\n",
    "\n",
    "```\n",
    "finish           total             single output\n",
    "    ^          /        \\\n",
    "    |        c1          c2        neighbors merge\n",
    "    |       /  \\        /  \\\n",
    "    |     b1    b2    b3    b4     neighbors merge\n",
    "    ^    / \\   / \\   / \\   / \\\n",
    "start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "L = zs\n",
    "while len(L) > 1:\n",
    "    new_L = []\n",
    "    for i in range(0, len(L), 2):\n",
    "        lazy = add(L[i], L[i + 1])  # add neighbors\n",
    "        new_L.append(lazy)\n",
    "    L = new_L                       # swap old list for new\n",
    "\n",
    "dask.compute(L)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you're watching the [dashboard's status page](../proxy/8787/status) then you may want to note two things:\n",
    "\n",
    "1.  The red bars are for inter-worker communication.  They happen as different workers need to combine their intermediate values\n",
    "2.  There is lots of parallelism at the beginning but less towards the end as we reach the top of the tree where there is less work to do.\n",
    "\n",
    "Alternativley you may want to navigate to the [dashboard's graph page](../proxy/8787/graph) and then run the cell above again.  You will be able to see the task graph evolve during the computation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Further Reading\n",
    "\n",
    "For a more in-depth introdution to delayed and lazy operation in Dask, see the [dask tutorial](https://github.com/dask/dask-tutorial), notebooks 01 and 01x."
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
